{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "DjUA6S30k52h"
      },
      "source": [
        "##### Copyright 2021 The TensorFlow Authors."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "SpNWyqewk8fE"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "6x1ypzczQCwy"
      },
      "source": [
        "# Simple TFX Pipeline Tutorial using Penguin dataset\n",
        "\n",
        "***A Short tutorial to run a simple TFX pipeline.***"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "HU9YYythm0dx"
      },
      "source": [
        "Note: We recommend running this tutorial in a Colab notebook, with no setup required!  Just click \"Run in Google Colab\".\n",
        "\n",
        "<div class=\"devsite-table-wrapper\"><table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "<td><a target=\"_blank\" href=\"https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple\">\n",
        "<img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\"/>View on TensorFlow.org</a></td>\n",
        "<td><a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/penguin_simple.ipynb\">\n",
        "<img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\">Run in Google Colab</a></td>\n",
        "<td><a target=\"_blank\" href=\"https://github.com/tensorflow/tfx/tree/master/docs/tutorials/tfx/penguin_simple.ipynb\">\n",
        "<img width=32px src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\">View source on GitHub</a></td>\n",
        "<td><a href=\"https://storage.googleapis.com/tensorflow_docs/tfx/docs/tutorials/tfx/penguin_simple.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\" />Download notebook</a></td>\n",
        "</table></div>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_VuwrlnvQJ5k"
      },
      "source": [
        "In this notebook-based tutorial, we will create and run a TFX pipeline\n",
        "for a simple classification model.\n",
        "The pipeline will consist of three essential TFX components: ExampleGen,\n",
        "Trainer and Pusher. The pipeline includes the most minimal ML workflow like\n",
        "importing data, training a model and exporting the trained model.\n",
        "\n",
        "Please see\n",
        "[Understanding TFX Pipelines](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines)\n",
        "to learn more about various concepts in TFX."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Fmgi8ZvQkScg"
      },
      "source": [
        "## Set Up\n",
        "We first need to install the TFX Python package and download\n",
        "the dataset which we will use for our model.\n",
        "\n",
        "### Upgrade Pip\n",
        "\n",
        "To avoid upgrading Pip in a system when running locally,\n",
        "check to make sure that we are running in Colab.\n",
        "Local systems can of course be upgraded separately."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "as4OTe2ukSqm"
      },
      "outputs": [],
      "source": [
        "try:\n",
        "  import colab\n",
        "  !pip install --upgrade pip\n",
        "except:\n",
        "  pass"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MZOYTt1RW4TK"
      },
      "source": [
        "### Install TFX\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "iyQtljP-qPHY"
      },
      "outputs": [],
      "source": [
        "!pip install -U tfx"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EwT0nov5QO1M"
      },
      "source": [
        "### Did you restart the runtime?\n",
        "\n",
        "If you are using Google Colab, the first time that you run\n",
        "the cell above, you must restart the runtime by clicking\n",
        "above \"RESTART RUNTIME\" button or using \"Runtime > Restart\n",
        "runtime ...\" menu. This is because of the way that Colab\n",
        "loads packages."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BDnPgN8UJtzN"
      },
      "source": [
        "Check the TensorFlow and TFX versions."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "6jh7vKSRqPHb"
      },
      "outputs": [],
      "source": [
        "import tensorflow as tf\n",
        "print('TensorFlow version: {}'.format(tf.__version__))\n",
        "from tfx import v1 as tfx\n",
        "print('TFX version: {}'.format(tfx.__version__))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "aDtLdSkvqPHe"
      },
      "source": [
        "### Set up variables\n",
        "\n",
        "There are some variables used to define a pipeline. You can customize these\n",
        "variables as you want. By default all output from the pipeline will be\n",
        "generated under the current directory."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "EcUseqJaE2XN"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "\n",
        "PIPELINE_NAME = \"penguin-simple\"\n",
        "\n",
        "# Output directory to store artifacts generated from the pipeline.\n",
        "PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)\n",
        "# Path to a SQLite DB file to use as an MLMD storage.\n",
        "METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')\n",
        "# Output directory where created models from the pipeline will be exported.\n",
        "SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)\n",
        "\n",
        "from absl import logging\n",
        "logging.set_verbosity(logging.INFO)  # Set default logging level."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8F2SRwRLSYGa"
      },
      "source": [
        "### Prepare example data\n",
        "We will download the example dataset for use in our TFX pipeline. The dataset we\n",
        "are using is\n",
        "[Palmer Penguins dataset](https://allisonhorst.github.io/palmerpenguins/articles/intro.html)\n",
        "which is also used in other\n",
        "[TFX examples](https://github.com/tensorflow/tfx/tree/master/tfx/examples/penguin).\n",
        "\n",
        "There are four numeric features in this dataset:\n",
        "\n",
        "- culmen_length_mm\n",
        "- culmen_depth_mm\n",
        "- flipper_length_mm\n",
        "- body_mass_g\n",
        "\n",
        "All features were already normalized to have range [0,1]. We will build a\n",
        "classification model which predicts the `species` of penguins."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "11J7XiCq6AFP"
      },
      "source": [
        "Because TFX ExampleGen reads inputs from a directory, we need to create a\n",
        "directory and copy dataset to it."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "4fxMs6u86acP"
      },
      "outputs": [],
      "source": [
        "import urllib.request\n",
        "import tempfile\n",
        "\n",
        "DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.\n",
        "_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'\n",
        "_data_filepath = os.path.join(DATA_ROOT, \"data.csv\")\n",
        "urllib.request.urlretrieve(_data_url, _data_filepath)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ASpoNmxKSQjI"
      },
      "source": [
        "Take a quick look at the CSV file."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "-eSz28UDSnlG"
      },
      "outputs": [],
      "source": [
        "!head {_data_filepath}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "OTtQNq1DdVvG"
      },
      "source": [
        "You should be able to see five values. `species` is one of 0, 1 or 2, and all\n",
        "other features should have values between 0 and 1."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "nH6gizcpSwWV"
      },
      "source": [
        "## Create a pipeline\n",
        "\n",
        "TFX pipelines are defined using Python APIs. We will define a pipeline which\n",
        "consists of following three components.\n",
        "- CsvExampleGen: Reads in data files and convert them to TFX internal format\n",
        "for further processing. There are multiple\n",
        "[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)s for various\n",
        "formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.\n",
        "- Trainer: Trains an ML model.\n",
        "[Trainer component](https://www.tensorflow.org/tfx/guide/trainer) requires a\n",
        "model definition code from users. You can use TensorFlow APIs to specify how to\n",
        "train a model and save it in a _saved_model_ format.\n",
        "- Pusher: Copies the trained model outside of the TFX pipeline.\n",
        "[Pusher component](https://www.tensorflow.org/tfx/guide/pusher) can be thought\n",
        "of an deployment process of the trained ML model.\n",
        "\n",
        "Before actually define the pipeline, we need to write a model code for the\n",
        "Trainer component first."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "lOjDv93eS5xV"
      },
      "source": [
        "### Write model training code\n",
        "\n",
        "We will create a simple DNN model for classification using TensorFlow Keras\n",
        "API. This model training code will be saved to a separate file.\n",
        "\n",
        "In this tutorial we will use\n",
        "[Generic Trainer](https://www.tensorflow.org/tfx/guide/trainer#generic_trainer)\n",
        "of TFX which support Keras-based models. You need to write a Python file\n",
        "containing `run_fn` function, which is the entrypoint for the `Trainer`\n",
        "component."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "aES7Hv5QTDK3"
      },
      "outputs": [],
      "source": [
        "_trainer_module_file = 'penguin_trainer.py'"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Gnc67uQNTDfW"
      },
      "outputs": [],
      "source": [
        "%%writefile {_trainer_module_file}\n",
        "\n",
        "from typing import List\n",
        "from absl import logging\n",
        "import tensorflow as tf\n",
        "from tensorflow import keras\n",
        "from tensorflow_transform.tf_metadata import schema_utils\n",
        "\n",
        "from tfx import v1 as tfx\n",
        "from tfx_bsl.public import tfxio\n",
        "from tensorflow_metadata.proto.v0 import schema_pb2\n",
        "\n",
        "_FEATURE_KEYS = [\n",
        "    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'\n",
        "]\n",
        "_LABEL_KEY = 'species'\n",
        "\n",
        "_TRAIN_BATCH_SIZE = 20\n",
        "_EVAL_BATCH_SIZE = 10\n",
        "\n",
        "# Since we're not generating or creating a schema, we will instead create\n",
        "# a feature spec.  Since there are a fairly small number of features this is\n",
        "# manageable for this dataset.\n",
        "_FEATURE_SPEC = {\n",
        "    **{\n",
        "        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)\n",
        "           for feature in _FEATURE_KEYS\n",
        "       },\n",
        "    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)\n",
        "}\n",
        "\n",
        "\n",
        "def _input_fn(file_pattern: List[str],\n",
        "              data_accessor: tfx.components.DataAccessor,\n",
        "              schema: schema_pb2.Schema,\n",
        "              batch_size: int = 200) -> tf.data.Dataset:\n",
        "  \"\"\"Generates features and label for training.\n",
        "\n",
        "  Args:\n",
        "    file_pattern: List of paths or patterns of input tfrecord files.\n",
        "    data_accessor: DataAccessor for converting input to RecordBatch.\n",
        "    schema: schema of the input data.\n",
        "    batch_size: representing the number of consecutive elements of returned\n",
        "      dataset to combine in a single batch\n",
        "\n",
        "  Returns:\n",
        "    A dataset that contains (features, indices) tuple where features is a\n",
        "      dictionary of Tensors, and indices is a single Tensor of label indices.\n",
        "  \"\"\"\n",
        "  return data_accessor.tf_dataset_factory(\n",
        "      file_pattern,\n",
        "      tfxio.TensorFlowDatasetOptions(\n",
        "          batch_size=batch_size, label_key=_LABEL_KEY),\n",
        "      schema=schema).repeat()\n",
        "\n",
        "\n",
        "def _build_keras_model() -> tf.keras.Model:\n",
        "  \"\"\"Creates a DNN Keras model for classifying penguin data.\n",
        "\n",
        "  Returns:\n",
        "    A Keras Model.\n",
        "  \"\"\"\n",
        "  # The model below is built with Functional API, please refer to\n",
        "  # https://www.tensorflow.org/guide/keras/overview for all API options.\n",
        "  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]\n",
        "  d = keras.layers.concatenate(inputs)\n",
        "  for _ in range(2):\n",
        "    d = keras.layers.Dense(8, activation='relu')(d)\n",
        "  outputs = keras.layers.Dense(3)(d)\n",
        "\n",
        "  model = keras.Model(inputs=inputs, outputs=outputs)\n",
        "  model.compile(\n",
        "      optimizer=keras.optimizers.Adam(1e-2),\n",
        "      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n",
        "      metrics=[keras.metrics.SparseCategoricalAccuracy()])\n",
        "\n",
        "  model.summary(print_fn=logging.info)\n",
        "  return model\n",
        "\n",
        "\n",
        "# TFX Trainer will call this function.\n",
        "def run_fn(fn_args: tfx.components.FnArgs):\n",
        "  \"\"\"Train the model based on given args.\n",
        "\n",
        "  Args:\n",
        "    fn_args: Holds args used to train the model as name/value pairs.\n",
        "  \"\"\"\n",
        "\n",
        "  # This schema is usually either an output of SchemaGen or a manually-curated\n",
        "  # version provided by pipeline author. A schema can also derived from TFT\n",
        "  # graph if a Transform component is used. In the case when either is missing,\n",
        "  # `schema_from_feature_spec` could be used to generate schema from very simple\n",
        "  # feature_spec, but the schema returned would be very primitive.\n",
        "  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)\n",
        "\n",
        "  train_dataset = _input_fn(\n",
        "      fn_args.train_files,\n",
        "      fn_args.data_accessor,\n",
        "      schema,\n",
        "      batch_size=_TRAIN_BATCH_SIZE)\n",
        "  eval_dataset = _input_fn(\n",
        "      fn_args.eval_files,\n",
        "      fn_args.data_accessor,\n",
        "      schema,\n",
        "      batch_size=_EVAL_BATCH_SIZE)\n",
        "\n",
        "  model = _build_keras_model()\n",
        "  model.fit(\n",
        "      train_dataset,\n",
        "      steps_per_epoch=fn_args.train_steps,\n",
        "      validation_data=eval_dataset,\n",
        "      validation_steps=fn_args.eval_steps)\n",
        "\n",
        "  # The result of the training should be saved in `fn_args.serving_model_dir`\n",
        "  # directory.\n",
        "  model.save(fn_args.serving_model_dir, save_format='tf')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "blaw0rs-emEf"
      },
      "source": [
        "Now you have completed all preparation steps to build a TFX pipeline."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "w3OkNz3gTLwM"
      },
      "source": [
        "### Write a pipeline definition\n",
        "\n",
        "We define a function to create a TFX pipeline. A `Pipeline` object\n",
        "represents a TFX pipeline which can be run using one of pipeline\n",
        "orchestration systems that TFX supports.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "M49yYVNBTPd4"
      },
      "outputs": [],
      "source": [
        "def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,\n",
        "                     module_file: str, serving_model_dir: str,\n",
        "                     metadata_path: str) -> tfx.dsl.Pipeline:\n",
        "  \"\"\"Creates a three component penguin pipeline with TFX.\"\"\"\n",
        "  # Brings data into the pipeline.\n",
        "  example_gen = tfx.components.CsvExampleGen(input_base=data_root)\n",
        "\n",
        "  # Uses user-provided Python function that trains a model.\n",
        "  trainer = tfx.components.Trainer(\n",
        "      module_file=module_file,\n",
        "      examples=example_gen.outputs['examples'],\n",
        "      train_args=tfx.proto.TrainArgs(num_steps=100),\n",
        "      eval_args=tfx.proto.EvalArgs(num_steps=5))\n",
        "\n",
        "  # Pushes the model to a filesystem destination.\n",
        "  pusher = tfx.components.Pusher(\n",
        "      model=trainer.outputs['model'],\n",
        "      push_destination=tfx.proto.PushDestination(\n",
        "          filesystem=tfx.proto.PushDestination.Filesystem(\n",
        "              base_directory=serving_model_dir)))\n",
        "\n",
        "  # Following three components will be included in the pipeline.\n",
        "  components = [\n",
        "      example_gen,\n",
        "      trainer,\n",
        "      pusher,\n",
        "  ]\n",
        "\n",
        "  return tfx.dsl.Pipeline(\n",
        "      pipeline_name=pipeline_name,\n",
        "      pipeline_root=pipeline_root,\n",
        "      metadata_connection_config=tfx.orchestration.metadata\n",
        "      .sqlite_metadata_connection_config(metadata_path),\n",
        "      components=components)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "mJbq07THU2GV"
      },
      "source": [
        "## Run the pipeline\n",
        "\n",
        "TFX supports multiple orchestrators to run pipelines.\n",
        "In this tutorial we will use `LocalDagRunner` which is included in the TFX\n",
        "Python package and runs pipelines on local environment.\n",
        "We often call TFX pipelines \"DAGs\" which stands for directed acyclic graph.\n",
        "\n",
        "`LocalDagRunner` provides fast iterations for developemnt and debugging.\n",
        "TFX also supports other orchestrators including Kubeflow Pipelines and Apache\n",
        "Airflow which are suitable for production use cases.\n",
        "\n",
        "See\n",
        "[TFX on Cloud AI Platform Pipelines](https://www.tensorflow.org/tfx/tutorials/tfx/cloud-ai-platform-pipelines)\n",
        "or\n",
        "[TFX Airflow Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/airflow_workshop)\n",
        "to learn more about other orchestration systems."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7mp0AkmrPdUb"
      },
      "source": [
        "Now we create a `LocalDagRunner` and pass a `Pipeline` object created from the\n",
        "function we already defined.\n",
        "\n",
        "The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "fAtfOZTYWJu-"
      },
      "outputs": [],
      "source": [
        "tfx.orchestration.LocalDagRunner().run(\n",
        "  _create_pipeline(\n",
        "      pipeline_name=PIPELINE_NAME,\n",
        "      pipeline_root=PIPELINE_ROOT,\n",
        "      data_root=DATA_ROOT,\n",
        "      module_file=_trainer_module_file,\n",
        "      serving_model_dir=SERVING_MODEL_DIR,\n",
        "      metadata_path=METADATA_PATH))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ppERq0Mj6xvW"
      },
      "source": [
        "You should see \"INFO:absl:Component Pusher is finished.\" at the end of the\n",
        "logs if the pipeline finished successfully. Because `Pusher` component is the\n",
        "last component of the pipeline.\n",
        "\n",
        "The pusher component pushes the trained model to the `SERVING_MODEL_DIR` which\n",
        "is the `serving_model/penguin-simple` directory if you did not change the\n",
        "variables in the previous steps. You can see the result from the file browser\n",
        "in the left-side panel in Colab, or using the following command:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "NTHROkqX6yHx"
      },
      "outputs": [],
      "source": [
        "# List files in created model directory.\n",
        "!find {SERVING_MODEL_DIR}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "08R8qvweThRf"
      },
      "source": [
        "## Next steps\n",
        "\n",
        "You can find more resources on https://www.tensorflow.org/tfx/tutorials.\n",
        "\n",
        "Please see\n",
        "[Understanding TFX Pipelines](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines)\n",
        "to learn more about various concepts in TFX.\n"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [
        "DjUA6S30k52h"
      ],
      "name": "penguin_simple.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
